java - Spring Integration Aggregator -
i want use aggregator create message out of 2 messages, i'm not sure how this.
at moment i'm reading in 2 files directory , want aggregate messages one.
my whole project looks this:
read in .zip -> pass custom message handler unzips directory -> read files directory -> try aggregate them
it great if send message 2 payloads after unzipping file, aggregating after reading suffice.
my unzipper looks this:
public class ziphandler extends abstractmessagehandler { file dat; file json; @override protected void handlemessageinternal(message<?> message) throws exception { byte[] buffer = new byte[1024]; try { file file = (file) message.getpayload(); zipfile zip = new zipfile(file); (enumeration<? extends zipentry> entries = zip.entries(); entries .hasmoreelements();) { zipentry ze = entries.nextelement(); string name = ze.getname(); if (name.endswith(".dat") || name.endswith(".dat")) { inputstream input = zip.getinputstream(ze); file datfile = new file("d:/lrtrans/zipout" + file.separator + name); fileoutputstream fos = new fileoutputstream(datfile); int len; while ((len = input.read(buffer)) > 0) { fos.write(buffer, 0, len); } this.dat = datfile; fos.close(); } else if (name.endswith(".json") || name.endswith(".json")) { inputstream input = zip.getinputstream(ze); file jsonfile = new file("d:/lrtrans/zipout" + file.separator + name); fileoutputstream fos = new fileoutputstream(jsonfile); int len; while ((len = input.read(buffer)) > 0) { fos.write(buffer, 0, len); } this.json = jsonfile; fos.close(); } } zip.close(); } catch (exception e) { e.printstacktrace(); } } }
it takes files , puts them 2 directories, read them in again using filereadingmessagesource. solve using annotation based notation, not xml.
edit:
i want use defaultaggregatingmessagegroupprocssor correlationstrategy based on header called "zip" , releasestrategy based on message since, in case 2 files should combined one.
@aggregator(inputchannel = "toaggregatorchannel", outputchannel = "torouterchannel", discardchannel = "nullchannel") public defaultaggregatingmessagegroupprocessor aggregate(){ defaultaggregatingmessagegroupprocessor aggregator = new defaultaggregatingmessagegroupprocessor(); return aggregator; } @correlationstrategy public string correlateby(@header("zipfile") string zip){ return zip; } @releasestrategy public boolean isreadytorelease(list<message<?>> messages) { return messages.size() == 2; }
i'd go right way. since zip has several files in it, correct requirement unzip , collect files 1 message , send further process.
so, yes, <aggregator>
you. there need determine how correlate , group them.
don't know how unzip them, can use zip file name correlationkey
, use number of files group size determine signal release group.
feel free ask more questions. first of need see "unzipper".
update
first of all, annotation-based aggregator configuration bit limited , better use @serviceactivator
on aggregatingmessagehandler
@bean
have more control on options.
however, can achieve requirements choice. @aggregator
configuration should follow pojo-method invocation principles:
@aggregator(inputchannel = "toaggregatorchannel", outputchannel = "torouterchannel", discardchannel = "nullchannel") public list<file> aggregate(list<file> files){ return files; }
something that.
Comments
Post a Comment